{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# %load './code/helpers/imports.py'\n",
    "import notebook\n",
    "import os.path, json, io, pandas\n",
    "import matplotlib.pyplot as plt\n",
    "%pylab inline\n",
    "pylab.rcParams['figure.figsize'] = (54, 60)\n",
    "\n",
    "\n",
    "from retrying import retry # for exponential back down when calling TurboOverdrive API\n",
    "\n",
    "import pyspark.sql.functions as func # resuse as func.coalace for example\n",
    "from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType\n",
    "\n",
    "import pandas as pandas\n",
    "from geopandas import GeoDataFrame # Loading boundaries Data\n",
    "from shapely.geometry import Point, Polygon, shape # creating geospatial data\n",
    "from shapely import wkb, wkt # creating and parsing geospatial data\n",
    "import overpy # OpenStreetMap API\n",
    "\n",
    "from ast import literal_eval as make_tuple # used to decode data from java\n",
    "\n",
    "# make sure nbextensions are installed\n",
    "notebook.nbextensions.check_nbextension('usability/codefolding', user=True)\n",
    "\n",
    "try:\n",
    "    sc\n",
    "except NameError:\n",
    "    import pyspark\n",
    "    sc = pyspark.SparkContext('local[*]')\n",
    "    sqlContext = pyspark.sql.SQLContext(sc)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# understanding de/serialization\n",
    "\n",
    "from pyspark.rdd import RDD\n",
    "\n",
    "dl = [\n",
    "    (u'2', {u'director': u'David Lean'}), \n",
    "    (u'7', {u'director': u'Andrew Dominik'})\n",
    "]\n",
    "\n",
    "dl_rdd = sc.parallelize(dl)\n",
    "java_object = dl_rdd._to_java_object_rdd()\n",
    "back_to_python = sc._jvm.SerDe.javaToPython(java_object)\n",
    "\n",
    "python_rdd = RDD(back_to_python, sc)\n",
    "python_rdd.count()\n",
    "\n",
    "back_to_java = python_rdd._to_java_object_rdd()\n",
    "and_back_to_python = sc._jvm.SerDe.javaToPython(back_to_java)\n",
    "t = RDD(and_back_to_python, sc)\n",
    "\n",
    "print t.collect()\n",
    "print back_to_java.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Spatial Spark\n",
    "\n",
    "SpatialSpark aims to provide efficient spatial operations using Apache Spark. It can be used as a Spark library for spatial extension as well as a standalone application to process large scale spatial join operations.\n",
    "\n",
    "SpatialSpark has been compiled and tested on Spark 1.6.1. For geometry operations and data structures for indexes, well known JTS library is used.\n",
    "\n",
    "## Spatial Partition\n",
    "\n",
    "Generate a spatial partition from input dataset, currently Fixed-Grid Partition (FGP), Binary-Split Partition (BSP) and Sort-Tile Partition (STP) are supported.\n",
    "\n",
    "## Spatial Range Query\n",
    "\n",
    "Spatial range query includes both indexed and non-indexed query. For non-indexed query, a full scan is performed on the dataset and returns filtered results. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from shapely.geometry import MultiPolygon\n",
    "MultiPolygon([\n",
    "    Polygon([(0, 0), (0, 10), (10, 10), (10, 0)]), # A\n",
    "    Polygon([(-4, -4), (-4, 4), (4, 4), (4, -4)]), # B\n",
    "    Polygon([(7, 7), (7, 8), (8, 8), (8, 7)])      # C\n",
    "])\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from ast import literal_eval as make_tuple\n",
    "print \"Java Spark context version:\", sc._jsc.version()\n",
    "spatialspark = sc._jvm.spatialspark\n",
    "\n",
    "rectangleA = Polygon([(0, 0), (0, 10), (10, 10), (10, 0)])\n",
    "rectangleB = Polygon([(-4, -4), (-4, 4), (4, 4), (4, -4)])\n",
    "rectangleC = Polygon([(7, 7), (7, 8), (8, 8), (8, 7)])\n",
    "pointD = Point((-1, -1))\n",
    "\n",
    "def geomABWithId():\n",
    "  return sc.parallelize([\n",
    "    (0L, rectangleA.wkt),\n",
    "    (1L, rectangleB.wkt)\n",
    "  ])\n",
    "\n",
    "def geomCWithId():\n",
    "  return sc.parallelize([\n",
    "    (0L, rectangleC.wkt)\n",
    "  ])\n",
    "\n",
    "def geomABCWithId():\n",
    "  return sc.parallelize([\n",
    "  (0L, rectangleA.wkt),\n",
    "  (1L, rectangleB.wkt),\n",
    "  (2L, rectangleC.wkt)])\n",
    "\n",
    "def geomDWithId():\n",
    "  return sc.parallelize([\n",
    "    (0L, pointD.wkt)\n",
    "  ])\n",
    "\n",
    "\n",
    "dfAB = sqlContext.createDataFrame(geomABWithId(), ['id', 'wkt'])\n",
    "dfABC = sqlContext.createDataFrame(geomABCWithId(), ['id', 'wkt'])\n",
    "dfC = sqlContext.createDataFrame(geomCWithId(), ['id', 'wkt'])\n",
    "dfD = sqlContext.createDataFrame(geomDWithId(), ['id', 'wkt'])\n",
    "# Supported Operators: Within, WithinD, Contains, Intersects, Overlaps, NearestD\n",
    "SpatialOperator      = spatialspark.operator.SpatialOperator \n",
    "BroadcastSpatialJoin = spatialspark.join.BroadcastSpatialJoin\n",
    "\n",
    "joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Intersects(), 0.0)\n",
    "\n",
    "joinRDD.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "results = joinRDD.collect()\n",
    "map(lambda result: make_tuple(result.toString()), results)\n",
    "\n",
    "# [(0, 0), (1, 1), (2, 0)] read as:\n",
    "# ID 0 is within 0\n",
    "# ID 1 is within 1\n",
    "# ID 2 is within 0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Exercise: Find out if geometry with ID 2 overlaps geometry with ID 1\n",
    "\n",
    "# ----------------------------- solution below ----------------------------- #\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0)\n",
    "results = joinRDD.collect()\n",
    "map(lambda result: make_tuple(result.toString()), results)\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.9"
  },
  "toc": {
   "toc_cell": false,
   "toc_number_sections": true,
   "toc_threshold": 6,
   "toc_window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
